package cn.gupao.udfs;

import cn.gupao.pojo.LogBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Map;

public class RulesMatchFunction extends KeyedProcessFunction<String, LogBean, String> {

    private Connection hbaseConn;
    private java.sql.Connection clickHouseConn;

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    public void open(Configuration parameters) throws Exception {
        //super.open(parameters);
        //使用ListState保存用户最近2小时的数据

        //创建Hbase连接

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
        hbaseConf.set("hbase.zookeeper.quorum", "bigdata001,bigdata002,bigdata003");
        hbaseConf.setInt("hbase.zookeeper.property.clientPort", 2181);
        hbaseConn = ConnectionFactory.createConnection(hbaseConf);

        //创建ClickHouse连接
        clickHouseConn = DriverManager.getConnection("jdbc:clickhouse://bigdata003:8123/default?characterEncoding=utf-8");
        //创建ClickHouse连接
    }

    @Override
    public void processElement(LogBean bean, Context ctx, Collector<String> out) throws Exception {

        String eventId = bean.getEventId();
        String deviceId = bean.getDeviceId();
        Map<String, String> properties = bean.getProperties();
        //1.匹配当地数据的EventId
        if ("E".equals(eventId) && "v1".equals(properties.get("p2"))) {

            //2.匹配用户画像（Hbase）
            Table table = hbaseConn.getTable(TableName.valueOf("eagle_profile"));
            Get get = new Get(Bytes.toBytes(deviceId));
            //添加查询条件（指定列和列的标识符）
            get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("tag2"));
            get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("tag20"));
            Result result = table.get(get);
            if (result != null) {
                byte[] value2 = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("tag2"));
                byte[] value20 = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("tag20"));
                if (value2 != null && value20 != null) {
                    String tag2 = new String(value2);
                    String tag20 = new String(value20);
                    if (tag2.equals("v1") && tag20.equals("v2")) {
                        //3.匹配历史行为数据(ClickHouse)
                        String sql = "select count(*) counts from eagle_detail where deviceId = ? and eventId = ? and properties['p9'] = ? and timeStamp >= ? and timeStamp <= ?";
                        PreparedStatement preparedStatement = clickHouseConn.prepareStatement(sql);
                        preparedStatement.setString(1, deviceId);
                        preparedStatement.setString(2, "E");
                        preparedStatement.setString(3, "v2");
                        preparedStatement.setLong(4, sdf.parse("2021-11-10 00:00:00").getTime());
                        preparedStatement.setLong(5, sdf.parse("2022-11-14 23:59:59").getTime());
                        ResultSet resultSet = preparedStatement.executeQuery();
                        while (resultSet.next()) {
                            long count = resultSet.getLong("counts");
                            if (count >= 1) {
                                out.collect(deviceId + " : 满足了规则");
                            }
                        }
                        resultSet.close();
                        preparedStatement.close();

                    }
                }
            }
        }
    }

    @Override
    public void close() throws Exception {
        //关闭连接
        hbaseConn.close();
        clickHouseConn.close();
    }
}
